Voicenote Storage: Optimize audio handling #815
Conversation
📝 WalkthroughWalkthroughThis PR extends the LLM audio pipeline to persist large audio payloads to S3 and serve them via presigned URLs. Audio inputs and outputs are uploaded on-demand, with URI-only representations stored in the database while clients receive short-lived download links. ChangesLarge Audio Asset Storage and Presigned URL Serving Pipeline
Sequence DiagramsequenceDiagram
participant Client
participant Jobs
participant ResolveInput
participant S3Upload
participant ChainExecutor
Client->>Jobs: AudioInput (base64 or url)
Jobs->>ResolveInput: resolve format
ResolveInput-->>Jobs: audio bytes
Jobs->>S3Upload: upload bytes
S3Upload-->>Jobs: s3:// URI
Jobs->>Jobs: update DB input to URI-only
Jobs->>Jobs: send to LLM provider
Jobs-->>Jobs: receive TTS audio (base64)
Jobs->>S3Upload: upload TTS bytes
S3Upload-->>Jobs: s3:// URI
Jobs->>Jobs: store URI-only in DB
Jobs->>Jobs: generate presigned URL
Jobs->>ChainExecutor: final result with presigned URL
ChainExecutor->>ChainExecutor: _resolve_presigned_url
ChainExecutor-->>Client: response with https:// presigned URL
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
|
Please fix the |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (4)
backend/app/services/llm/chain/executor.py (1)
73-73: ⚡ Quick winAdd a type hint for
output.
_resolve_presigned_urlis the only changed function here with an untyped parameter. Please annotate it explicitly, even if you keep the runtimeisinstanceguard.As per coding guidelines,
**/*.py: Always add type hints to all function parameters and return values in Python code.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/llm/chain/executor.py` at line 73, The _resolve_presigned_url method currently has an untyped parameter; add an explicit type hint for output (for example output: dict[str, Any] or Mapping[str, Any], or a more specific TypedDict/Union if you have a known shape) and keep the existing return annotation (-> None); update imports from typing as needed and preserve the existing runtime isinstance guards inside _resolve_presigned_url to validate the structure at runtime while satisfying the project's static typing rules.backend/app/core/storage_utils.py (1)
244-248: 💤 Low valueExtension fallback may mislabel non-WAV audio when MIME type is unknown.
When
mime_typeis provided but missing from_MIME_TO_EXT(e.g.,audio/x-m4a,audio/3gpp), the filename gets.wavwhile the upload'scontent_typeis set to the actual MIME. This creates a filename/Content-Type mismatch in S3. A safer fallback is to derive the extension frommime_typeitself viamimetypes.guess_extension, falling back towavonly when MIME is absent.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/core/storage_utils.py` around lines 244 - 248, The filename extension fallback currently uses _MIME_TO_EXT.get(mime_type or "", "wav") which mislabels audio when mime_type is present but not in the map; modify the logic in the block using _MIME_TO_EXT, mime_type, ext, filename and upload_to_object_store to: if mime_type is truthy try _MIME_TO_EXT[mime_type] first, else call mimetypes.guess_extension(mime_type) and strip a leading '.' if present; only fall back to "wav" when mime_type is falsy or guess_extension returns None/empty; then construct filename and call upload_to_object_store as before so the file extension matches the upload content_type.backend/app/services/llm/jobs.py (1)
731-749: 💤 Low valueMinor: TTS upload duplicates the work
update_llm_call_response's base64 size-calc already does.Inside the TTS branch you call
base64.b64decode(tts_output.content.value)to upload. A few lines later in the same function the call chain reachesupdate_llm_call_response, which would alsob64decodethe same value to computeaudio_size_bytesifdb_contentwere still base64. The current code dodges that double-decode (becausedb_contentis replaced withformat: "uri"), but it's coincidental. Consider passing the decoded bytes once and threadingsize_bytesintodb_contentso both DB record and S3 upload use a single decode and stay consistent.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/llm/jobs.py` around lines 731 - 749, The TTS branch currently decodes base64 twice implicitly; instead decode once into tts_bytes (from tts_output.content.value), compute audio_size_bytes = len(tts_bytes), pass the decoded bytes into upload_audio_bytes_to_s3 (instead of re-decoding later) and thread audio_size_bytes into the db_content passed to update_llm_call_response so the DB record uses size_bytes and db_content can be set to the "uri" form without needing to b64decode again; update references: tts_output (AudioOutput), tts_bytes, upload_audio_bytes_to_s3, update_llm_call_response, db_content and llm_call_id to ensure one decode and consistent size metadata.backend/app/utils.py (1)
609-625: 💤 Low valueAdd a
[download_audio_bytes]log line and handle empty bodies explicitly.Minor follow-ups on
resolve_audio_url:
download_audio_byteshas no log emission on success/failure, so when STT URL ingestion misbehaves there's nothing to correlate. Per coding guidelines, prefix any added log with[download_audio_bytes].- An empty
audio_bytes(length 0) with noerroris currently treated as success and written to a temp file. Worth guarding so the provider doesn't receive a 0-byte file.As per coding guidelines: "Prefix all log messages with the function name in square brackets".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/utils.py` around lines 609 - 625, Add a log call in resolve_audio_url that records the result of download_audio_bytes using logger and prefixed with "[download_audio_bytes]" (include success/failure and any error), then explicitly check for an empty audio_bytes (len==0) after download_audio_bytes returns and treat it as an error (return "", "Downloaded audio is empty" or similar) instead of writing a 0-byte temp file; update behavior in resolve_audio_url and reference the download_audio_bytes call and logger to locate where to insert the log and the empty-body guard.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/app/services/llm/chain/executor.py`:
- Around line 73-91: The helper _resolve_presigned_url currently mutates the
persisted Output object (changing output.content.uri), which causes the durable
s3:// reference to be overwritten; instead, create and mutate a shallow/deep
copy of the Output (or just copy its content.uri into a new Output/Content
instance), generate/set the presigned URL on that copy (or set a separate
presigned_uri field/local variable) and leave the original Output untouched;
change _resolve_presigned_url to return the copied/modified Output (or presigned
URI) and update callers (e.g., _teardown and the call site that forwards
result.response.response.output into update_llm_chain_status) to use the
returned copy for sending to clients/logs while preserving the stored result for
persistence.
In `@backend/app/services/llm/jobs.py`:
- Around line 574-624: The STT URL branch passes user-controlled
query.input.content.value directly into download_audio_bytes, exposing SSRF/DoS
risk; before calling download_audio_bytes (in the AudioInput branch in
execute_llm_call), validate the URL is HTTPS, resolve and reject
private/internal IPs, enforce max download size and safe redirect behavior (or
else rely only on a hardened download_audio_bytes that performs those checks),
and reject or log and skip on invalid URLs so upload_audio_bytes_to_s3 /
update_llm_call_input never operate on unvalidated remote resources.
- Around line 722-774: The current flow leaves db_content as the full base64
payload when any TTS S3 upload path fails; change the error paths inside the TTS
handling so that on upload failure (upload_audio_bytes_to_s3 returns falsy) or
any exception thrown by get_cloud_storage/b64decode/upload_audio_bytes_to_s3 you
replace db_content with a metadata-only record (e.g.
{"type":"audio","content":{"format":"base64-failed","value":None,"mime_type":tts_output.content.mime_type,"size_bytes":len(tts_output.content.value)
if tts_output and tts_output.content and tts_output.content.value else None}})
and log the failure, or alternatively raise/mark a job error so the caller
(execute_llm_call / update_llm_call_response) will not persist the full base64;
update the try/except and the upload falsy branch around tts_output, db_content,
get_cloud_storage, and upload_audio_bytes_to_s3 to ensure db_content is never
left pointing at the original model_dump containing the base64.
In `@backend/app/utils.py`:
- Around line 595-606: download_audio_bytes currently performs an unsafe
requests.get without URL validation, redirects handling, host resolution checks,
or streaming limits; update download_audio_bytes to (1) enforce HTTPS-only
(reuse validate_callback_url logic or call it), (2) disable automatic redirects
(requests.get(..., allow_redirects=False)) and on any 3xx response validate the
Location by resolving its hostname and re-checking against _is_private_ip before
following, or loop manually following redirects with per-hop re-resolution, (3)
resolve the request hostname to IP(s) and reject
private/loopback/link-local/reserved addresses using the existing _is_private_ip
helper, and (4) stream the response via iter_content with a running byte counter
and an enforced max bytes cap (and/or validate Content-Length before
downloading) to prevent memory/DoS; also ensure any HTTP (non-TLS) scheme is
rejected and surface clear error messages returned by download_audio_bytes so
callers like resolve_audio_url and jobs.py receive safe failures.
---
Nitpick comments:
In `@backend/app/core/storage_utils.py`:
- Around line 244-248: The filename extension fallback currently uses
_MIME_TO_EXT.get(mime_type or "", "wav") which mislabels audio when mime_type is
present but not in the map; modify the logic in the block using _MIME_TO_EXT,
mime_type, ext, filename and upload_to_object_store to: if mime_type is truthy
try _MIME_TO_EXT[mime_type] first, else call
mimetypes.guess_extension(mime_type) and strip a leading '.' if present; only
fall back to "wav" when mime_type is falsy or guess_extension returns
None/empty; then construct filename and call upload_to_object_store as before so
the file extension matches the upload content_type.
In `@backend/app/services/llm/chain/executor.py`:
- Line 73: The _resolve_presigned_url method currently has an untyped parameter;
add an explicit type hint for output (for example output: dict[str, Any] or
Mapping[str, Any], or a more specific TypedDict/Union if you have a known shape)
and keep the existing return annotation (-> None); update imports from typing as
needed and preserve the existing runtime isinstance guards inside
_resolve_presigned_url to validate the structure at runtime while satisfying the
project's static typing rules.
In `@backend/app/services/llm/jobs.py`:
- Around line 731-749: The TTS branch currently decodes base64 twice implicitly;
instead decode once into tts_bytes (from tts_output.content.value), compute
audio_size_bytes = len(tts_bytes), pass the decoded bytes into
upload_audio_bytes_to_s3 (instead of re-decoding later) and thread
audio_size_bytes into the db_content passed to update_llm_call_response so the
DB record uses size_bytes and db_content can be set to the "uri" form without
needing to b64decode again; update references: tts_output (AudioOutput),
tts_bytes, upload_audio_bytes_to_s3, update_llm_call_response, db_content and
llm_call_id to ensure one decode and consistent size metadata.
In `@backend/app/utils.py`:
- Around line 609-625: Add a log call in resolve_audio_url that records the
result of download_audio_bytes using logger and prefixed with
"[download_audio_bytes]" (include success/failure and any error), then
explicitly check for an empty audio_bytes (len==0) after download_audio_bytes
returns and treat it as an error (return "", "Downloaded audio is empty" or
similar) instead of writing a 0-byte temp file; update behavior in
resolve_audio_url and reference the download_audio_bytes call and logger to
locate where to insert the log and the empty-body guard.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: dcc84054-a87a-4955-93e8-2d6518521cd4
📒 Files selected for processing (6)
backend/app/core/storage_utils.pybackend/app/crud/llm.pybackend/app/models/llm/request.pybackend/app/services/llm/chain/executor.pybackend/app/services/llm/jobs.pybackend/app/utils.py
| def _resolve_presigned_url(self, output) -> None: | ||
| """Swap the s3:// URI in content.uri for a presigned URL in-place. | ||
|
|
||
| Non-fatal: clears uri on failure so clients don't receive a raw s3:// address. | ||
| """ | ||
| if isinstance(output, AudioOutput) and output.content.uri: | ||
| try: | ||
| with Session(engine) as session: | ||
| storage = get_cloud_storage(session, self._context.project_id) | ||
| output.content.uri = storage.get_signed_url( | ||
| output.content.uri, expires_in=3600 | ||
| ) | ||
| except Exception as e: | ||
| logger.warning( | ||
| f"[_resolve_presigned_url] Failed to generate presigned URL: {e} | " | ||
| f"job_id={self._context.job_id}", | ||
| exc_info=True, | ||
| ) | ||
| output.content.uri = None |
There was a problem hiding this comment.
Avoid mutating the persisted output when presigning.
This helper rewrites the same result.response.response.output object that _teardown() later passes to update_llm_chain_status(...). That means the chain record will persist either a 1-hour signed URL or uri=None on signing failure, instead of the durable s3:// reference. Presign a response/callback copy and leave the stored output untouched.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/services/llm/chain/executor.py` around lines 73 - 91, The helper
_resolve_presigned_url currently mutates the persisted Output object (changing
output.content.uri), which causes the durable s3:// reference to be overwritten;
instead, create and mutate a shallow/deep copy of the Output (or just copy its
content.uri into a new Output/Content instance), generate/set the presigned URL
on that copy (or set a separate presigned_uri field/local variable) and leave
the original Output untouched; change _resolve_presigned_url to return the
copied/modified Output (or presigned URI) and update callers (e.g., _teardown
and the call site that forwards result.response.response.output into
update_llm_chain_status) to use the returned copy for sending to clients/logs
while preserving the stored result for persistence.
| if ( | ||
| isinstance(query.input, AudioInput) | ||
| and query.input.content.format in ("base64", "url") | ||
| and llm_call_id | ||
| ): | ||
| try: | ||
| if query.input.content.format == "url": | ||
| stt_bytes, dl_error = download_audio_bytes( | ||
| query.input.content.value | ||
| ) | ||
| if dl_error or not stt_bytes: | ||
| raise ValueError(dl_error or "Empty audio bytes from URL") | ||
| # Rewrite to base64 in-place so the provider resolve path | ||
| # reuses these bytes instead of issuing a second HTTP download. | ||
| query.input.content.value = base64.b64encode(stt_bytes).decode() | ||
| query.input.content.format = "base64" | ||
| else: | ||
| stt_bytes = base64.b64decode(query.input.content.value) | ||
|
|
||
| storage = get_cloud_storage(session, project_id) | ||
| subfolder_path = f"orgs/{organization_id}/{project_id}/audio/stt" | ||
| s3_url = upload_audio_bytes_to_s3( | ||
| storage, | ||
| stt_bytes, | ||
| llm_call_id, | ||
| query.input.content.mime_type, | ||
| subfolder_path, | ||
| ) | ||
| if s3_url: | ||
| stt_input_record = json.dumps( | ||
| { | ||
| "type": "audio", | ||
| "format": "uri", | ||
| "mime_type": query.input.content.mime_type, | ||
| "size_bytes": len(stt_bytes), | ||
| "uri": s3_url, | ||
| } | ||
| ) | ||
| update_llm_call_input(session, llm_call_id, stt_input_record) | ||
| logger.info( | ||
| f"[execute_llm_call] STT audio uploaded to S3 | llm_call_id={llm_call_id}" | ||
| ) | ||
| else: | ||
| logger.warning( | ||
| f"[execute_llm_call] STT S3 upload failed | llm_call_id={llm_call_id}" | ||
| ) | ||
| except Exception as e: | ||
| logger.warning( | ||
| f"[execute_llm_call] STT S3 upload error, continuing: {e} | llm_call_id={llm_call_id}", | ||
| exc_info=True, | ||
| ) |
There was a problem hiding this comment.
STT URL ingestion inherits the SSRF / unbounded-download risk from download_audio_bytes.
This branch passes user-supplied query.input.content.value straight into download_audio_bytes to pre-fetch bytes for S3 upload. Until download_audio_bytes validates the URL (HTTPS-only, non-private IP, redirect handling, max body size — see the comment on backend/app/utils.py), this code path is reachable as an SSRF/DoS vector via the /llm/call API.
Once download_audio_bytes is hardened, this caller benefits automatically; no change is needed here. Noting it for cross-layer traceability.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/services/llm/jobs.py` around lines 574 - 624, The STT URL branch
passes user-controlled query.input.content.value directly into
download_audio_bytes, exposing SSRF/DoS risk; before calling
download_audio_bytes (in the AudioInput branch in execute_llm_call), validate
the URL is HTTPS, resolve and reject private/internal IPs, enforce max download
size and safe redirect behavior (or else rely only on a hardened
download_audio_bytes that performs those checks), and reject or log and skip on
invalid URLs so upload_audio_bytes_to_s3 / update_llm_call_input never operate
on unvalidated remote resources.
There was a problem hiding this comment.
This is done. You may recheck during pushing changes.
There was a problem hiding this comment.
🧩 Analysis chain
I'll verify the hardening in download_audio_bytes before confirming.
🏁 Script executed:
#!/bin/bash
# Check the current implementation of download_audio_bytes and related helpers in utils.py
rg -n "download_audio_bytes|validate.*url|SSRF|private.*ip|max.*size|max_size|redirect|https" backend/app/utils.py -A 5 -B 2
Length of output: 3080
@Prajna1999, after inspecting the current code, the hardening does not appear to be in place yet for download_audio_bytes. Here is what the function currently looks like (Line 595):
def download_audio_bytes(url: str) -> tuple[bytes | None, str | None]:
"""Download audio from a public URL. Returns (bytes, error)."""
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.content, NoneThe SSRF/DoS protections are still missing:
- ❌ No HTTPS-only enforcement
- ❌ No private/internal IP rejection
- ❌ No max body size limit
- ❌ No redirect blocking (
allow_redirectsdefaults toTrue)
Notably, _is_private_ip and validate_callback_url already exist in the same file and do enforce those protections — but they are only wired to send_callback, not to download_audio_bytes.
The fix would be to reuse that existing infrastructure. For example:
_MAX_AUDIO_BYTES = 25 * 1024 * 1024 # 25 MB
def download_audio_bytes(url: str) -> tuple[bytes | None, str | None]:
"""Download audio from a public URL. Returns (bytes, error)."""
try:
# Reuse the same SSRF-safe validation already applied to callback URLs.
validate_callback_url(url)
except ValueError as e:
return None, str(e)
try:
with requests.Session() as session:
session.trust_env = False
response = session.get(
url,
timeout=30,
allow_redirects=False,
stream=True,
)
response.raise_for_status()
chunks = []
total = 0
for chunk in response.iter_content(chunk_size=65536):
total += len(chunk)
if total > _MAX_AUDIO_BYTES:
return None, f"Audio file exceeds maximum allowed size ({_MAX_AUDIO_BYTES} bytes)"
chunks.append(chunk)
return b"".join(chunks), None
except requests.RequestException as e:
return None, str(e)Could you double-check whether the fix was pushed to the branch? It may not have made it into the latest commit that was reviewed.
🧬 Code Graph Analysis Results
-
File: backend/app/crud/jobs.py, Lines 11-49
Content:class JobCrud: def __init__(self, session: Session): self.session = session def create( self, job_type: JobType, trace_id: str | None = None, project_id: int | None = None, ) -> Job: new_job = Job(job_type=job_type, trace_id=trace_id, project_id=project_id) self.session.add(new_job) self.session.commit() self.session.refresh(new_job) return new_job def update(self, job_id: UUID, job_update: JobUpdate) -> Job: job = self.session.get(Job, job_id) if not job: raise ValueError(f"Job not found with the given job_id {job_id}") update_data = job_update.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(job, field, value) job.updated_at = now() self.session.add(job) self.session.commit() self.session.refresh(job) return job def get(self, job_id: UUID, project_id: int) -> Job | None: job = self.session.get(Job, job_id) if job is None: return None if job.project_id not in (None, project_id): return None return jobNotes:
- Provides basic CRUD for Job with create, update, get.
- Update raises ValueError if job not found.
-
File: backend/app/crud/llm.py, Lines 56-162
Summary:- create_llm_call(session, *, request, job_id, chain_id=None, project_id, organization_id, resolved_config, original_provider) -> LlmCall
- Creates an LLM call record.
- Derives input_type and output_type from the completion type (e.g., stt, tts, text/image/pdf/multimodal).
- Stores config either as a stored reference (config_id + version) or inline blob (resolved_config).
- Attaches conversation_id if present in the request.
- Returns the created LlmCall.
- Logs creation; may raise HTTPException on issues with stored config retrieval or validation.
- update_llm_call_input(session, llm_call_id, s3_uri) -> None
- Overwrites the llm_call.input with an S3 URI after STT upload.
- If the LLM call is not found, logs a warning and returns.
- update_llm_call_response(session, *, llm_call_id, provider_response_id=None, content=None, usage=None, conversation_id=None) -> LlmCall
- Updates an LLM call with provider response, usage, and optional conversation_id.
- If content is audio, computes audio_size_bytes when possible (based on base64-encoded content).
- Updates updated_at, persists, and returns the updated LlmCall.
These functions are used by execute_llm_call to persist request/response data.
- create_llm_call(session, *, request, job_id, chain_id=None, project_id, organization_id, resolved_config, original_provider) -> LlmCall
-
File: backend/app/services/llm/providers/registry.py, Lines 55-85
Content:def get_llm_provider( session: Session, provider_type: str, project_id: int, organization_id: int ) -> BaseProvider: from app.crud.credentials import get_provider_credential provider_class = LLMProvider.get_provider_class(provider_type) # e.g., "openai-native" → "openai", "claude-native" → "claude" credential_provider = provider_type.replace("-native", "") credentials = get_provider_credential( session=session, provider=credential_provider, project_id=project_id, org_id=organization_id, ) if not credentials: raise ValueError( f"Credentials for provider '{credential_provider}' not configured for this project." ) try: client = provider_class.create_client(credentials=credentials) return provider_class(client=client) except ValueError: # Re-raise ValueError for credential/configuration errors raise except Exception as e: logger.error(f"Failed to initialize {provider_type} client: {e}", exc_info=True) raise RuntimeError(f"Could not connect to {provider_type} services.")Notes:
- Resolves a provider class from type string, fetches credentials, and instantiates a client.
- Raises ValueError if credentials are missing; otherwise may raise.
-
File: backend/app/core/langfuse/langfuse.py, Lines 177-294
Summary:- Function: observe_llm_execution(session_id=None, credentials=None) -> Callable
- Returns a decorator that, when wrapping a provider's execute method,:
- Initializes Langfuse client if credentials provided; otherwise logs and proceeds.
- Creates a Langfuse trace and, if possible, a generation trace for the model.
- Calls the wrapped provider execute method and records success/failure metrics to Langfuse.
- On success, ends traces and flushes; on failure, updates traces with error details.
- Handles exceptions by delegating to the wrapped function (unless the Langfuse client fails to initialize).
-
File: backend/app/core/telemetry.py, Lines 259-295
Content (set_gen_ai_request_attributes):def set_gen_ai_request_attributes( span: trace.Span, *, provider: str, model: str, operation: str, organization_id: int | None, project_id: int | None, params: dict[str, Any] | None = None, ) -> None: """Set OTel GenAI request attributes on `span` (semantic-convention keys + kaapi ids).""" span.set_attribute("gen_ai.system", provider) span.set_attribute("gen_ai.provider.name", provider) span.set_attribute("gen_ai.operation.name", operation) if model: span.set_attribute("gen_ai.request.model", model) if organization_id is not None: span.set_attribute("kaapi.organization_id", organization_id) span.set_attribute("gen_ai.request.organization_id", organization_id) if project_id is not None: span.set_attribute("kaapi.project_id", project_id) span.set_attribute("gen_ai.request.project_id", project_id) params = params or {} for attr_key, param_key in ( ("gen_ai.request.temperature", "temperature"), ("gen_ai.request.max_tokens", "max_tokens"), ("gen_ai.request.top_p", "top_p"), ("gen_ai.request.presence_penalty", "presence_penalty"), ("gen_ai.request.frequency_penalty", "frequency_penalty"), ): if param_key in params: span.set_attribute(attr_key, params.get(param_key)) tools = params.get("tools") if tools is not None: span.set_attribute("gen_ai.request.available_tools", json.dumps(tools))Content (set_gen_ai_response_attributes):
def set_gen_ai_response_attributes( span: trace.Span, *, response: "LLMCallResponse" ) -> None: """Set OTel GenAI response attributes (usage, model) on `span`.""" usage = response.usage if usage: span.set_attribute("gen_ai.usage.input_tokens", usage.input_tokens) span.set_attribute("gen_ai.usage.output_tokens", usage.output_tokens) span.set_attribute("gen_ai.usage.total_tokens", usage.total_tokens) if getattr(usage, "reasoning_tokens", None) is not None: span.set_attribute( "gen_ai.usage.output_tokens.reasoning", usage.reasoning_tokens ) if response.response and response.response.model: span.set_attribute("gen_ai.response.model", response.response.model) -
File: backend/app/models/llm/request.py, Lines 424-458
Summary:- Class LLMCallRequest(SQLModel)
- Fields:
- query: QueryParams
- config: LLMCallConfig
- callback_url: HttpUrl | None
- include_provider_raw_response: bool
- request_metadata: dict[str, Any] | None
- Purpose:
- Represents an LLM call request, where config can be a stored config (id + version) or an ad-hoc blob.
- Notes:
- Used by execute_job and execute_llm_call to drive processing and callbacks.
-
File: backend/app/api/routes/llm.py, Lines 53-108
Summary:- Endpoint llm_call(_current_user, session, request)
- Determines project_id and organization_id from context.
- Validates callback URL if provided.
- Calls start_job(...) to schedule background LLM call.
- Fetches job info and returns immediate APIResponse with job details (possibly including callback messaging).
- Purpose:
- Initiates an LLM call as a background job and returns status-like info for polling or callback delivery.
-
File: backend/app/celery/utils.py, Lines 31-38 and 41-56
Summary:- start_llm_job(project_id, job_id, trace_id="N/A", **kwargs)
- start_llm_chain_job(project_id, job_id, trace_id="N/A", **kwargs)
- Both enqueue Celery tasks (run_llm_job / run_llm_chain_job) with trace context
- Log task start information for observability
- Purpose:
- Provide standard helpers to enqueue LLM and LLM chain jobs via Celery with tracing context.
-
File: backend/app/services/llm/chain/chain.py, Lines 82-99
Summary:- class ChainBlock.execute(self, query)
- Executes a single block by calling execute_llm_call with the block's config and context
- Returns a BlockResult
- Notes:
- Part of the chain execution flow; used by LLMChain to run blocks sequentially.
-
File: backend/app/services/llm/chain/chain.py, Lines 109-137
Summary:- class LLMChain
- execute(self, query, on_block_completed=None): runs blocks sequentially
- For each block, runs block.execute(current_query), aggregates results, and passes output to next block
- Returns final BlockResult
- Notes:
- Central orchestration for chain-based LLM tasks.
-
File: backend/app/services/llm/guardrails.py, Lines 82-169
Summary:- list_validators_config(...) -> tuple[list[dict], list[dict]]
- run_guardrails_validation(...)
- Purpose:
- Fetch guardrail validator configs and run validations on inputs/outputs
- Returns results used by apply_input_guardrails and apply_output_guardrails
-
File: backend/app/core/storage_utils.py, Lines 225-248
Summary:- upload_audio_bytes_to_s3(...)
- Uploads audio bytes to S3-like storage, returns s3:// URI on success
- Parameters include storage, audio_bytes, call_id, mime_type, prefix
- Returns s3:// URI or None on failure
-
File: backend/app/core/storage.py (referenced by get_cloud_storage) and related storage usage
Summary:- get_cloud_storage(session, project_id) -> CloudStorage
- Retrieves project to determine storage path, creates AmazonCloudStorage
- Raises HTTPException if project invalid or storage creation fails
-
File: backend/app/services/llm/chain/executor.py, Lines 26-225
Summary:- class ChainExecutor
- run(self): runs the chain via _setup, chain.execute, and _teardown
- _setup: marks parent job/chain as processing/running and stores webhook secret
- _resolve_presigned_url(output): converts s3:// URIs to presigned URLs for outputs
- _teardown(result): on success, builds callback data, calls webhook, updates statuses
- _handle_error(error): builds failure response, calls webhook, updates statuses
- _on_block_completed: aggregates usage; may trigger intermediate callbacks
- _send_intermediate_callback: sends partial results if enabled
- _handle_unexpected_error: returns a generic failure path
Notes: - Encapsulates end-to-end lifecycle of a chain run, including callbacks and storage URL handling.
| def download_audio_bytes(url: str) -> tuple[bytes | None, str | None]: | ||
| """Download audio from a public URL. Returns (bytes, error).""" | ||
| try: | ||
| response = requests.get(url, timeout=30) | ||
| response.raise_for_status() | ||
| return response.content, None | ||
| except requests.exceptions.Timeout: | ||
| return None, f"Timed out downloading audio from URL: {url}" | ||
| except requests.exceptions.HTTPError as e: | ||
| return None, f"HTTP {e.response.status_code} downloading audio from URL: {url}" | ||
| except Exception as e: | ||
| return None, f"Failed to download audio from URL: {str(e)}" |
There was a problem hiding this comment.
Critical SSRF risk — user-supplied audio URLs are fetched without validation.
download_audio_bytes is invoked for AudioContent payloads coming directly from API callers (via resolve_audio_url and from jobs.py for STT URL inputs). With requests.get here:
- The URL is never validated, so a caller can point
valueat internal IPs (e.g.,127.0.0.1,169.254.169.254, RFC1918 ranges), achieving SSRF against the cloud metadata service or internal control planes. allow_redirectsdefaults toTrue, so even an HTTPS public host can redirect to an internal address; per-hop validation is required.- The full body is buffered into memory with
response.contentand noContent-Length/ streaming cap, enabling a memory-exhaustion DoS via a large/zip-bomb-style response. - HTTP (non-TLS) URLs are not rejected.
This file already implements validate_callback_url and _is_private_ip for the outbound callback path — the same protections should apply here. At minimum:
- Reject non-HTTPS schemes.
- Resolve the hostname and reject private/loopback/link-local/reserved addresses (re-resolve after each redirect, or disable redirects).
- Enforce an upper bound on downloaded bytes (
iter_contentwith a running total, or checkContent-Length).
🛡️ Sketch of a hardened implementation
-def download_audio_bytes(url: str) -> tuple[bytes | None, str | None]:
- """Download audio from a public URL. Returns (bytes, error)."""
- try:
- response = requests.get(url, timeout=30)
- response.raise_for_status()
- return response.content, None
- except requests.exceptions.Timeout:
- return None, f"Timed out downloading audio from URL: {url}"
- except requests.exceptions.HTTPError as e:
- return None, f"HTTP {e.response.status_code} downloading audio from URL: {url}"
- except Exception as e:
- return None, f"Failed to download audio from URL: {str(e)}"
+MAX_AUDIO_DOWNLOAD_BYTES = 50 * 1024 * 1024 # 50 MiB
+
+def download_audio_bytes(url: str) -> tuple[bytes | None, str | None]:
+ """Download audio from a public URL with SSRF + size protection."""
+ try:
+ validate_callback_url(url) # reuses SSRF guard (HTTPS + IP checks)
+ except ValueError as ve:
+ return None, f"Disallowed audio URL: {ve}"
+ try:
+ with requests.get(
+ url, timeout=30, stream=True, allow_redirects=False
+ ) as response:
+ response.raise_for_status()
+ content_length = response.headers.get("Content-Length")
+ if content_length and int(content_length) > MAX_AUDIO_DOWNLOAD_BYTES:
+ return None, "Audio file exceeds maximum allowed size"
+ buf = bytearray()
+ for chunk in response.iter_content(chunk_size=64 * 1024):
+ buf.extend(chunk)
+ if len(buf) > MAX_AUDIO_DOWNLOAD_BYTES:
+ return None, "Audio file exceeds maximum allowed size"
+ return bytes(buf), None
+ except requests.exceptions.Timeout:
+ return None, f"Timed out downloading audio from URL: {url}"
+ except requests.exceptions.HTTPError as e:
+ return None, f"HTTP {e.response.status_code} downloading audio from URL: {url}"
+ except Exception as e:
+ return None, f"Failed to download audio from URL: {str(e)}"Note: validate_callback_url currently enforces HTTPS-only — confirm that requirement matches your STT customer contract; if HTTP must be allowed, fork the check to keep the IP-range protection but relax the scheme.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/utils.py` around lines 595 - 606, download_audio_bytes currently
performs an unsafe requests.get without URL validation, redirects handling, host
resolution checks, or streaming limits; update download_audio_bytes to (1)
enforce HTTPS-only (reuse validate_callback_url logic or call it), (2) disable
automatic redirects (requests.get(..., allow_redirects=False)) and on any 3xx
response validate the Location by resolving its hostname and re-checking against
_is_private_ip before following, or loop manually following redirects with
per-hop re-resolution, (3) resolve the request hostname to IP(s) and reject
private/loopback/link-local/reserved addresses using the existing _is_private_ip
helper, and (4) stream the response via iter_content with a running byte counter
and an enforced max bytes cap (and/or validate Content-Length before
downloading) to prevent memory/DoS; also ensure any HTTP (non-TLS) scheme is
rejected and surface clear error messages returned by download_audio_bytes so
callers like resolve_audio_url and jobs.py receive safe failures.
Summary
Target issue is #804
Explain the motivation for making this change. What existing problem does the pull request solve?
Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.pyordocker compose upin the repository root and test.Notes
Please add here if any other information is required for the reviewer.
Summary by CodeRabbit
New Features
Improvements